package xiaoa.java.spider.TeskHandle;



import java.net.URI;






import java.net.URL;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;

import xiaoa.java.jms.rabbitMq.ProducerQueue;
import xiaoa.java.log.L;
import xiaoa.java.mongoDB.DbMgr;
import xiaoa.java.spider.bean.EsMqHtmlBean;
import xiaoa.java.spider.bean.HtmlBean;
import xiaoa.java.spider.db.vo.FetchUrl;
import xiaoa.java.spider.parse.ParseHtml;
import xiaoa.java.utils.ExceptionUtils;

/**
 * 
 * @author lkc
 * @date 2016年11月5日 下午3:10:35
 * @version V1.0
 *
 */
public class HandlerEsFetch implements Runnable {

	
	// 入队
	private BlockingQueue<FetchUrl>  inQueue  = null;
	
	// 出队
	private BlockingQueue<FetchUrl>  outQueue  = null;
	
	private static final String index  = "web";
	
	private static final String type   = "html";
	
	// 速度
	private  AtomicInteger   speedCount = new AtomicInteger(0);
	
	// 发送队列
	private ProducerQueue  producerQueue  = null;
	
	public HandlerEsFetch(  BlockingQueue<FetchUrl>  inQueue  ,  BlockingQueue<FetchUrl>  outQueue )throws Throwable {

		this.inQueue  = inQueue;
		this.outQueue = outQueue;
		
		// 初始化队列
		producerQueue = new ProducerQueue("fetchNode");
		
	}

	
	/**
	 * 获取一个任务
	 * @Title: getTask
	 * @return
	 * @author xiaoa
	 */
	public FetchUrl getTask()throws Exception{
		return inQueue.take();
	}
	
	
	/**
	 * 添加url
	 * @Title: addUrl
	 * @param urlList
	 * @throws Exception
	 * @author xiaoa
	 */
	public void addUrlList(List<FetchUrl>  urlList )throws Exception{
		
		for (FetchUrl url : urlList ){
			outQueue.put(url);
		}
		
	}
	
	
	/**
	 * 速度
	 * @Title: speed
	 * @return
	 * @author xiaoa
	 */
	public  int speed(){
		
		int temp = speedCount.intValue();
		
		// 清空
		speedCount.set(0);
		
		return temp;
		
		
	}
	
	
	
	@Override
	public void run() {
		
		Thread.currentThread().setName("handler_" + Thread.currentThread().getName());

		
	boolean bu = true;	
		
	while (bu) {
		
		long startTime = System.currentTimeMillis();
		
		Throwable thro = null;
		// 获取一个url
		FetchUrl  url = null;
		try {

			url = getTask();
			
			// 获取网络资源
			Document  doc = Jsoup.parse(new URL(url.getUrl()), (int)TimeUnit.MINUTES.toMillis(1));

			// 获取url链接
			List<String>  urlList  = ParseHtml.getUrls(doc ,URI.create(url.getUrl()));
			
			List<FetchUrl>  urlBeanList = new ArrayList<>();
			
			for(String urlStr : urlList){
				
				FetchUrl  queueUrl  = new FetchUrl();
				queueUrl.setDepth(url.getDepth() + 1);
				queueUrl.setUrl(urlStr);
				queueUrl.setParent(url.getUrl());
				queueUrl.setCreateTime(new Date());
				
				// 添加到队列中
				urlBeanList.add(queueUrl);
			}
			
			// 添加到队列中
			addUrlList(urlBeanList);
			
			// 保存到 es
			doSaveEs(doc, new URL(url.getUrl()));
		
			
		} catch (Throwable e) {
			thro = e;
			
			L.info("url = " + url.getUrl());
			
			e.printStackTrace();
		}finally {
			try {
				submitTask(url,startTime,System.currentTimeMillis(), thro);
			} catch (Throwable e) {
				e.printStackTrace();
			}
		}
		
	}
	
	L.info("============== 线程死亡 threadName = " + Thread.currentThread().getName() + " bu = " + bu);
		
	}
	
	
	/**
	 * 提交任务
	 * @Title: submitTask
	 * @param url
	 * @param startTime
	 * @param endTime
	 * @param thro
	 * @throws Throwable
	 * @author xiaoa
	 */
	private  void submitTask(FetchUrl url , long startTime , long endTime, Throwable thro)throws Throwable{
		
		if (url == null){
			return ;
		}
		
		url.setUseTime((int)(endTime - startTime));
		url.setUpdateTime(new Date());
		if (thro == null){
			url.setState(FetchUrl.STATE_SUCC);
			url.setMessage("处理成功");
		}else {
			url.setState(FetchUrl.STATE_FAIL);
			url.setMessage(ExceptionUtils.exceptionToString(thro));
		}
		
		DbMgr.update(url, url.getId(), FetchUrl.class);
		

		// 增加速度
		speedCount.incrementAndGet();

		
	}
	
	
	
	
	
	
	/**
	 * 存入es
	 * @Title: doSaveEs
	 * @param html
	 * @param url
	 * @author xiaoa
	 */
	private  void doSaveEs(Document document , URL url)throws Throwable{
		
		if (document == null){
			return ;
		}
		
		
		String title = document.title();
		String text  = document.text();
		
		HtmlBean  data = new HtmlBean();
		
		data.setHtml(document.html());
		data.setTitle(title);
		data.setText(text);
		data.setUrl(url.toString());
		data.setIndexTime(new Date(System.currentTimeMillis()));
		data.setId(System.nanoTime() +"");
		
		// es
		EsMqHtmlBean  bean = new EsMqHtmlBean();
		bean.setIndex(index);
		bean.setType(type);
		bean.setData(data);
		bean.setId(data.getId());
		
		// 发送
		producerQueue.doSendMessage(bean);

		
	}
	

}
